跳到主要内容

Java IO学习-NIO 与 BIO 各种概念及区别

Stream 与 Channel 区别

stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层) stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用

二者均为全双工,即读写可以同时进行 虽然 Stream 是单向流动的,但是它也是全双工的

IO 模型

同步:线程自己去获取结果(一个线程) 例如:线程调用一个方法后,需要等待方法返回结果

异步:线程自己不去获取结果,而是由其它线程返回结果(至少两个线程) 例如:线程A调用一个方法后,继续向下运行,运行结果由线程B返回

当调用一次 channel.readstream.read 后,会由用户态切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

1、等待数据阶段

2、复制数据阶段:

根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种

阻塞 IO

用户线程进行 read 操作时,需要等待操作系统执行实际的 read 操作,此期间用户线程是被阻塞的,无法执行其他操作

非阻塞 IO

用户线程在一个循环中一直调用 read 方法,若内核空间中还没有数据可读,立即返回 只是在等待阶段非阻塞

用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果

多路复用 ⭐

Java 中通过 Selector 实现多路复用

  • 当没有事件是,调用 select 方法会被阻塞住
  • 一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

关于 I/O 多路复用(又被称为 “事件驱动”),首先要理解的是,操作系统为你提供了一个功能,当你的某个 socket 可读或者可写的时候,它可以给你一个通知。

这样当配合非阻塞的 socket 使用时,只有当系统通知我哪个描述符可读了,我才去执行 read 操作,可以保证每次 read 都能读到有效数据而不做纯返回 -1 和 EAGAIN 的无用功,写操作类似。

操作系统的这个功能是通过 select/poll/epoll/kqueue 之类的系统调用函数来实现,这些函数都可以同时监视多个描述符的读写就绪状况,这样,多个描述符的 I/O 操作都能在一个线程内并发交替地顺序完成,这就叫 I/O 多路复用。

注意:多路---指的是多个 socket 连接,复用---指的是复用同一个 Redis 处理线程。多路复用主要有三种技术:select,poll,epoll。epoll 是最新的也是目前最好的多路复用技术。

采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 I/O 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响 Redis 性能的瓶颈,基于这两点 Redis 具有很高的吞吐量。

多路复用与阻塞IO的区别

  • 阻塞 IO 模式下,若线程因 accept 事件被阻塞,发生 read 事件后,仍需等待 accept 事件执行完成后,才能去处理 read 事件
  • 多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行

异步IO

  • 线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果
  • 当方法的运行结果出来以后,由线程2将结果返回给线程1

零拷贝 ⭐

零拷贝指的是 数据无需拷贝到 JVM 内存中,同时具有以下三个优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流如下

Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA

调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝

接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

用户态与内核态的切换发生了 3 次,这个操作比较重量级 数据拷贝了共 4 次

通过 DirectByteBuf

ByteBuffer.allocate(10)

底层对应 HeapByteBuffer,使用的还是 Java 内存

ByteBuffer.allocateDirect(10)

底层对应 DirectByteBuffer,使用的是操作系统内存,如下:

大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用

这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写

Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步 1、DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列

  1. 当引用的对象 ByteBuffer 被垃圾回收以后,虚引用对象 Cleaner 就会被放入引用队列中,然后调用 Cleaner 的 clean 方法来释放直接内存
  2. DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法通过专门线程访问引用队列,根据虚引用释放堆外内存

减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化1

以下两种方式都是零拷贝,即无需将数据拷贝到用户缓冲区中(JVM内存中)

底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
  • 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

这种方法下

  • 只发生了1次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化2

linux 2.4 对上述方法再次进行了优化

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  • 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次

AIO 异步 IO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

先来看看 AsynchronousFileChannel

@Slf4j
public class AioDemo1 {
public static void main(String[] args) throws IOException {
try{
AsynchronousFileChannel s =
AsynchronousFileChannel.open(
Paths.get("1.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(2);
log.debug("begin...");
s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...{}", result);
buffer.flip();
debug(buffer);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.debug("read failed...");
}
});

} catch (IOException e) {
e.printStackTrace();
}
log.debug("do other things...");
System.in.read();
}
}

输出

13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d |a. |
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程 Thread-5
  • 主线程并没有 IO 操作阻塞

💡 守护线程

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

网络 AIO

public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}